package com.king.first.app.process;

import com.king.first.bean.WaterSensor;
import com.king.first.utils.ContentUtils;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;

public class Flink_ProcessJoinFunction {

    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        SingleOutputStreamOperator<WaterSensor> s1 = env
                .socketTextStream(ContentUtils.HOST, 8888)  // 在socket终端只输入毫秒级别的时间戳
                .map(value -> {
                    String[] datas = value.split(",");
                    return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));

                });
        SingleOutputStreamOperator<WaterSensor> s2 = env
                .socketTextStream(ContentUtils.HOST, 9999)  // 在socket终端只输入毫秒级别的时间戳
                .map(value -> {
                    String[] datas = value.split(",");
                    return new WaterSensor(datas[0], Long.valueOf(datas[1]), Integer.valueOf(datas[2]));

                });

        DataStream<String> result = s1.join(s2)
                .where(WaterSensor::getId)
                .equalTo(WaterSensor::getId)
                .window(TumblingProcessingTimeWindows.of(Time.seconds(4)))// 必须使用窗口
                .apply(new JoinFunction<WaterSensor, WaterSensor, String>() {

                    @Override
                    public String join(WaterSensor first, WaterSensor second) throws Exception {
                        return "first: " + first + ", second: " + second;
                    }
                });

        result.print();

        try {
            env.execute();
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
